Lifting an electric vehicle charger into the cloud

No Comments

With the increasing popularity and availability of battery electric vehicles, privately-owned charger infrastructure at home or on company premises has become more and more common.

A vehicle charger is much more than a simple mains socket – it contains a computer or at least a programmable logic controller (PLC) which keeps track of power levels, consumption, and state. Having access to this data can be very useful for the owner, for example for custom billing or energy management purposes. Existing proprietary solutions offered by the manufacturers do not necessarily guarantee a full match with the customer’s requirements regarding costs and connectivity.

So, let’s “rescue” the data from the charger and store it in our own infrastructure.

This article serves two primary purposes. First, it outlines a simple solution on how to acquire charger data for your own analysis purposes. Second, it can be used as a tutorial for a tiny IoT use case with Golang, MQTT and TimescaleDB.

Obtaining data from the charger

At the time being, most chargers provide a management interface using a proprietary protocol with varying degrees of documentation. In case of the charger used for this article, a Mennekes Amtron, the control unit had ethernet and wireless connectivity, a mobile app, but no public documentation on the protocol.

In cases like that, some options for proceeding are:

  • Ask the manufacturer for an API documentation
  • Analyze the network communication between management app and charger
  • Reverse-engineer a given piece of software, such as a mobile application

Working together, the manufacturer is always the preferred way, and would surely be no issue on our charger. But supposing that the charger is an exotic model without sophisticated customer support, it might happen that obtaining an official API documentation does not succeed.

In this case, analyzing the network traffic between the mobile application and charger would be a least-invasive and promising next step. After setting up a network monitoring environment, i.e. Wireshark and a switch with monitoring port, and listening to the network traffic while using the mobile app, the following findings have been made:

  • The charger uses a plaintext HTTP API listening on port 25000
  • The API prefix is /MHCP/1.0/
  • A client is considered as authenticated when it adds an http query parameter “devKey” with the value of the “Charger PIN1” from the manual
  • Request and response payload are JSON-encoded with human readable field names

The listing below shows an example of a state document retrieved from an actual charger on /MHCP/1.0/ChargeData?DevKey=0000:

{
  "ChgState": "Idle",
  "Tariff": "T1",
  "Price": 280,
  "Uid": "...",
  "ChgDuration": 11811,
  "ChgNrg": 11337,
  "NrgDemand": 5000,
  "Solar": 0,
  "EmTime": 1440,
  "RemTime": 1440,
  "ActPwr": 0,
  "ActCurr": 16,
  "MaxCurrT1": 16,
  "BeginH_T1": 4,
  "BeginM_T1": 30,
  "PriceT1": 280,
  "MaxCurrT2": 16,
  "BeginH_T2": 22,
  "BeginM_T2": 0,
  "PriceT2": 200,
  "RemoteCurr": 16,
  "SolarPrice": 0,
  "ExcessNrg": false,
  "TMaxCurrT1": 16,
  "TBeginH_T1": 4,
  "TBeginM_T1": 30,
  "TPriceT1": 280,
  "TMaxCurrT2": 16,
  "TBeginH_T2": 22,
  "TBeginM_T2": 0,
  "TPriceT2": 200,
  "TRemoteCurr": 16,
  "TSolarPrice": 0,
  "TExcessNrg": true,
  "HCCP": "A11"
}

As the network analysis revealed all details we needed, it was not necessary to reverse-engineer the mobile application. In cases of a less obvious protocol, it is advisable to look into the traffic a bit deeper, as the charger might use an industrial protocol such as Modbus or even some PLC-specific protocol.

Connecting the charger to the cloud

electric vehicle charger infrastructure

A plain-text http server with cleartext passwords cannot be safely exposed to the internet. Furthermore, having different charger types and interfaces would require our server infrastructure to be aware of multiple different communication protocols and their revisions. To protect the charger from attackers and provide a common protocol for all chargers, we add a tiny edge device which suits as a protocol converter and connectivity provider.

For our use case, this task is done by a Raspberry Pi with a WiFi uplink on the charger’s ethernet port. The protocol converter and forwarding software is a Go binary which:

  • periodically polls the HTTP interface from the PLC
  • connects to an MQTT server in the cloud
  • pushes changed charger state values to the MQTT server

Charger protocol converter

The charger protocol converter’s primary task is to fetch the charger state, map it to a transfer data model and submit it to our cloud.

Fetching the current state from the charger

The code below fetches the current state from the charger via an HTTP call and maps it to a generic charger state struct:

func (m *MennekesMCP) GetCurrentChargerState() (state models.ChargerState, err error) {
    resp, errHttp := http.Get(m.changerEndpoint)

    if errHttp != nil {
        log.WithFields(log.Fields{
            "error": errHttp,
        }).Warn("Unable to receive new Reading from charger")

        err = errHttp
        return
    }

    var chargeDataPDO MennekesChargeDataPDO

    if jsonDecodeError := json.NewDecoder(resp.Body).Decode(&chargeDataPDO); jsonDecodeError == nil {
        log.WithFields(log.Fields{
            "reading": chargeDataPDO,
        }).Trace("Received new Reading from Charger")

        state = models.ChargerState{
            OperationState:        mapMennekesState(chargeDataPDO.ChgState),
            TotalEnergyConsumption: float64(chargeDataPDO.ChgNrg),
            PowerOutput:            float64(chargeDataPDO.ActPwr),
            OutputCurrent:          float64(chargeDataPDO.ActCurr),
            // omitted remaining fields
        }
    } else {
        log.WithFields(log.Fields{
            "jsonDecodeError": jsonDecodeError,
        }).Error("Unable to read data from Mennekes Charger")
        err = jsonDecodeError
    }

    return
}

Forwarding charger data

Having obtained data from the charger, the obvious next step is to publish it to our cloud or any other more powerful system for further processing, such as persistence, analysis, or action derivation.

Although using a synchronous HTTP request for that purpose would theoretically work, a message queue appeared as a better fit, because:

  • we want to add and remove consumers of the charger data without having to modify the protocol converter or to install an additional gateway
  • we expect the next features to require bidirectional communication
  • we need a basic “shadow device” functionality, primarily to avoid unnecessary communication with the charger. The message brokers retain feature serves that purpose very well

Because of its smaller footprint compared to AMQP and its popularity in the IoT field, we use MQTT as messaging protocol.

Besides MQTT support in the connected applications, an MQTT broker is required. Depending on the overall setup, this could be a self-managed MQTT broker (such as Mosquitto or HiveMQ) or a managed broker, i.e. AWS IoT or Azure IoT. In this example, a Mosquitto instance is used.

MQTT topic hierarchy and payload

The MQTT message properties relevant for this case are:

  • the topic for routing and delivery purposes
  • the QoS statement defining the condition on which a message is considered delivered
  • a retain flag, which allows us to declare a message as the current value until a new one is published
  • the payload. MQTT is data-agnostic and regards the payload as an array of bytes

We prefer a topic hierarchy based on charger id and variable name over a single topic for all variables for the following reasons:

  • It reduces the complexity of a payload, because the topic already indicates what is inside
  • It reduces the load on the transport layer, because a fine-grained topic structure allows us to update only the values that have actually changed

Both advantages really come in handy on constrained devices on low bandwidth or expensive connections.

The resulting topic structure is:

chargers/{chargerName as defined}/{valueName as in shared data model}

For example: For a variable “OperationState” on a charger called “carpark1”, the topic is chargers/carpark1/OperationState.

Concerning the payload, a simple solution would be to publish the byte representation of the value (in our case, a 64-bit ANSI float). Supposed that all other applications involved used the same method of floating point representation and don`t care about the time the published value was actually sampled or the latency is extremely low, this might be a viable solution.

In our use case, it is expected that the receiver of a telegram has to be aware of the actual sampling time, for instance for computing integrals over the current power consumption. So we need to include the sampling time into the telegram.

There are several ways to achieve this:

  • Put the timestamp into the topic structure, which results in additional messages and increased complexity
  • Write the sampling timestamp into an MQTT5 header field
  • Instead of the raw value, emit a data structure which contains both timestamp and value

For the given scenario, the third option appears as most reasonable. For the task of encoding a structure into a byte array, one could either use JSON or a data serialization framework, such as AVRO or Google Protocol Buffers. For the sake of simplicity and compatibility with managed solutions (especially their monitoring and debugging tools), we will use plain JSON.

Certificate authentication

MQTT and Mosquitto can be operated with plaintext passwords or even with no passwords at all. In a real-world environment, both edge device and broker should be authenticated and authorized, furthermore it should be possible to isolate device scopes and revoke compromised access credentials.

This requirements are resolved by using certificate-based authentication. The generation and deployment of the certificates might differ depending on the trust infrastructure used in production, for development purposes a self-signed set of certificates might suffice.

After deploying the certificates to the MQTT broker, the Mosquitto broker configuration has to be adapted so that it will only accept MQTT/s connections from trusted devices.

port 8883

cafile /mosquitto/config/certs/rootCA.crt
keyfile /mosquitto/config/certs/server.key
certfile /mosquitto/config/certs/server.crt
tls_version tlsv1.2
require_certificate true
use_identity_as_username true
allow_anonymous false

persistence true
persistence_location /mosquitto/data/

Publishing Charger State

Adding MQTT publisher/subscriber capabilities to a Go application is performed by declaring a dependency to an MQTT library, in our case Eclipse Paho.

After adding an import to github.com/eclipse/paho.mqtt.golang, the project’s go.mod should contain an additional entry: github.com/eclipse/paho.mqtt.golang v1.2.0.

Generating a new MQTT.Client with certificate authentication requires a little more configuration than its plaintext auth counterpart, the following example is inspired by the Paho examples:

// inspired by
// https://github.com/eclipse/paho.mqtt.golang/blob/master/cmd/ssl/main.go
func NewMQTTEmitter(cfg mqtt.Config) (e *MQTTEmitter) {
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(cfg.TrustedCACertificate)
    if err == nil {
        certpool.AppendCertsFromPEM(pemCerts)
    }

    // Import client certificate/key pair
    cert, err := tls.LoadX509KeyPair(cfg.ClientCertificate, cfg.ClientKey)
    if err != nil {
        panic(err)
    }

    cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
    if err != nil {
        panic(err)
    }
    fmt.Println(cert.Leaf.DNSNames)

    tlsConfig := tls.Config{
        // RootCAs = certs used to verify server cert.
        RootCAs: certpool,
        // ClientAuth = whether to request cert from server.
        // Since the server is set up for SSL, this happens
        // anyways.
        ClientAuth: tls.NoClientCert,
        // ClientCAs = certs used to validate client cert.
        ClientCAs: nil,
        // InsecureSkipVerify = verify that cert contents
        // match server. IP matches what is in cert etc.
        InsecureSkipVerify: false,
        // Certificates = list of certs client sends to server.
        Certificates: []tls.Certificate{cert},
    }

    opts := MQTT.NewClientOptions().
        AddBroker(cfg.Broker).
        SetTLSConfig(&tlsConfig).
        SetClientID(cfg.ClientId)

    client := MQTT.NewClient(opts)

    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    return &MQTTEmitter{client: client, config: cfg}
}

As the MQTT connection is established and ready to use, the next step is to put both connection and data acquisition together.

To avoid publishing redundant values, we only publish values that have changed since the last successful publishing operation by implementing a change detection. To keep the change detection short and avoid redundancy, we use reflection to gather a list of properties of the provided charger state and derive a subset consisting of each item which has changed.

func (e *MQTTEmitter) deriveMessagesFromChangedValues(currentState models.ChargerState, nextState models.ChargerState) (changeMessages []models.ReadyToSendValue) {
    current := reflect.ValueOf(&currentState).Elem()
    next := reflect.ValueOf(&nextState).Elem()
    chargerStateType := current.Type()

    for i := 0; i < chargerStateType.NumField(); i++ {
        fieldName := chargerStateType.Field(i).Name
        currentField := current.FieldByName(fieldName)
        nextField := next.FieldByName(fieldName)

        if changed, currentValue, newValue := hasValueChanged(currentField, nextField); changed {
            changeMessages = append(changeMessages, models.ReadyToSendValue{
                Payload: models.ChargerValueEnvelope{
                    SamplingTime: time.Now(),
                    Value:        newValue,
                },
                Topic: e.config.Topic + fieldName,
            })
        }
    }

    return
}

The following function publishes the changes to the MQTT bus:

func (e *MQTTEmitter) EmitChargerState(sampletime time.Time, nextState models.ChargerState) {
    dueMessages := e.deriveMessagesFromChangedValues(e.previouslyEmittedState, nextState)
    e.previouslyEmittedState = nextState

    for _, message := range dueMessages {
        payloadJson, _ := json.Marshal(message.Payload)

        log.WithFields(log.Fields{
            "sampletime": sampletime,
            "topic":      message.Topic,
            "value":      message.Payload.Value,
        }).Debug("Emitting new State")

        e.client.Publish(message.Topic, 0, retain: true, payloadJson)
    }
}

As the publisher flags the messages as retained, the broker can permanently respond to new subscriptions with a “shadow” of the last known values.

Storing data

Having maintained a representation of the current state of the charger, the historic data reported by the chargers should be persisted in a database prior to further processing. Depending on the environment the system is operating in, this could either be a managed solution (such as Azure CosmosDB or AWS RDS) or a self-managed solution. For this tutorial, we use TimescaleDB, a SQL time-series database based on PostgresDB with 100% downwards compatibility.

TimescaleDB / PostgresDB

After installing Timescaledb, we define the schema:

CREATE TYPE OperationState as enum(
    'unknown',
    'off',
    'idle',
    'charging',
    'scheduled_downtime',
    'unscheduled_downtime'
);

CREATE TABLE public.ev_readings
(
    time TIMESTAMPTZ NOT NULL,
    OperationState OperationState,
    TotalEnergyConsumption double precision,
    PowerOutput double precision,
    OutputCurrent double precision,
    MaximumOutputCurrent double precision,
    ConnectedVehicle varchar(32)
);

Then, in case of TimescaleDb, we upgrade the table to a Hypertable:

SELECT create_hypertable('public.ev_readings', 'time')

A Timescale hypertable offers faster time-series queries and additional querying and reporting functionality.

Picking up and storing data from MQTT

Reading new values from the MQTT bus is done by setting up a subscription, consisting of the topic (or expression) to listen to and the handler function.

The implementation below subscribes to every topic below the chargerRootTopic, such as chargers/carpark1 and passes HandleNewMessage as handler function.

subscriber := &MQTTSubscriber{
        client:  client,
        config:  cfg,
        handler: HandleNewMessage,
        context: ctx}

subscriber.chargerRootTopic = chargerRootTopic + "/#"
subscriber.client.Subscribe(chargerRootTopic, 0, subscriber.HandleNewMessage)

The handling function has to deserialize the message and act as an adapter to the actual domain specific handler:

func (m *MQTTSubscriber) HandleNewMessage(client MQTT.Client, message MQTT.Message) {
    var chargerReading models.ChargerValueEnvelope
    messageReader := bytes.NewReader(message.Payload())

    if errDecodeJson := json.NewDecoder(messageReader).Decode(&chargerReading); errDecodeJson == nil {
        readingName := strings.Replace(message.Topic(), m.chargerRootTopic+"/", "", -1)

        log.WithFields(log.Fields{
            "charger": message.Topic(),
            "msg":     message.MessageID(),
            "reading": chargerReading,
        }).Trace("Received new Reading from Charger")

        m.handler.Handle(readingName, chargerReading)

    } else {
        log.Warn("Unable to decode incoming state json telegram")
    }
}

For the scope of this article, we just save the acquired data to the database:

func (p *Persistor) Handle(readingName string, reading models.ChargerValueEnvelope) error {
    log.WithFields(log.Fields{
        "readingName": readingName,
        "value" : reading.Value,
    }).Debug("Handling new charger message")

    conn, err := p.database.Acquire(context.Background())

    if err != nil {
        log.WithFields(log.Fields{
            "error": err,
        }).Warn("Error while connecting to db")
        return err
    }
    defer conn.Release()

    sanitizer := regexp.MustCompile(`[^a-zA-Z]`)
    sanitizedReadingName := sanitizer.ReplaceAll([]byte(readingName), []byte(""))

    sql := fmt.Sprintf("INSERT INTO public.ev_readings " +
        "(time, %s) VALUES ($1, $2)", string(sanitizedReadingName))

    samplingTimeUtc := reading.SamplingTime.UTC()
    _, errInsert := conn.Exec(context.Background(), sql, samplingTimeUtc, reading.Value)

    if errInsert != nil {
        log.WithFields(log.Fields{
            "error": errInsert,
        }).Warn("Error while inserting data")
        return errInsert
    }

    return nil
}

After running both data acquisition and persistor while a vehicle is connected and charging, the database table should look similar to the table below:

electric vehicle charger infrastructure table

Summary & next steps

This tutorial covered basic concepts of MQTT messaging design for an electric vehicle charger infrastructure, protocol analysis on unknown devices, database setup and putting it all together with the Go programming language.

Having a constant stream of updated state messages and a database with the entire history, this setup can act as a starting point for extensions such as dashboards or rule engines.